Skip to content

Add async transport #4614

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 43 commits into from
Aug 13, 2025

Conversation

srothh
Copy link
Member

@srothh srothh commented Jul 23, 2025

Add async implementation of the abstract Transport class. This transport utilizes the async task worker as well as the httpcore async functionality.

Thread Safety: As capture_envelope is registered by the client as a callback for several background threads in the sdk, which are not running the event loop, capture_envelope in the transport is made to be thread safe and allow for execution on the event loop from other threads. The same is currently not the case for flush, as there does not seem to be a usage from background threads, however if necessary, it can also be added.

HTTP2 support: Currently not activated, but from the looks of the httpcore docs it should be as simple as setting the http2 in the init of the pool to true. This likely makes sense to support, as HTTP2 shows great performance improvements with concurrent requests.

Kill: The kill method is sync, but the pool needs to be closed asynchronously. Currently, this is done by launching a task. However, the task cannot be awaited in sync code without deadlocking, therefore kill followed by an immediate loop shutdown could technically lead to resource leakage. Therefore, I decided to make kill optionally return the async task, so it can be awaited if called from an async context.

Note also that parts of the code are very similar to the HTTP2 integration, as they both use the httpcore library. Maybe in a later PR there could be a shared superclass to avoid code duplication?

GH-4582

Copy link

codecov bot commented Jul 23, 2025

❌ 17 Tests Failed:

Tests completed Failed Passed Skipped
20742 17 20725 1098
View the top 3 failed test(s) by shortest run time
tests.integrations.redis.test_redis_cache_module::test_cache_data
Stack Traces | 0.084s run time
.../integrations/redis/test_redis_cache_module.py:125: in test_cache_data
    connection = FakeStrictRedis(host="mycacheserver.io", port=6378)
.tox/py3.11-redis-v4/lib/python3.11.../site-packages/fakeredis/_connection.py:185: in __init__
    super().__init__(**kwds)
E   TypeError: Redis.__init__() got an unexpected keyword argument 'lib_name'
tests.integrations.redis.test_redis::test_data_truncation_custom
Stack Traces | 0.085s run time
.../integrations/redis/test_redis.py:205: in test_data_truncation_custom
    connection = FakeStrictRedis()
.tox/py3.11-redis-v4/lib/python3.11.../site-packages/fakeredis/_connection.py:185: in __init__
    super().__init__(**kwds)
E   TypeError: Redis.__init__() got an unexpected keyword argument 'lib_name'
tests.integrations.redis.test_redis_cache_module::test_no_cache_basic
Stack Traces | 0.085s run time
.../integrations/redis/test_redis_cache_module.py:26: in test_no_cache_basic
    connection = FakeStrictRedis()
.tox/py3.11-redis-v4/lib/python3.11.../site-packages/fakeredis/_connection.py:185: in __init__
    super().__init__(**kwds)
E   TypeError: Redis.__init__() got an unexpected keyword argument 'lib_name'

To view more test analytics, go to the Test Analytics Dashboard
📋 Got 3 mins? Take this short survey to help us improve Test Analytics.

@srothh srothh force-pushed the srothh/async-task-worker branch from 1a129f7 to 97c5e3d Compare July 23, 2025 14:04
@srothh srothh force-pushed the srothh/async-transport branch from 373942e to f01b00d Compare July 23, 2025 14:05
@srothh srothh marked this pull request as ready for review July 24, 2025 07:55
@srothh srothh requested a review from a team as a code owner July 24, 2025 07:55
cursor[bot]

This comment was marked as outdated.

cursor[bot]

This comment was marked as outdated.

cursor[bot]

This comment was marked as outdated.

srothh added 6 commits July 28, 2025 10:55
Add an abstract bass class for implementation of the background worker. This was done to provide a shared interface for the current
implementation of a threaded worker in the sync context as well as the upcoming async task-based worker implementation.

GH-4578
Add a new factory method instead of direct instatiation of the threaded background worker.
This allows for easy extension to other types of workers, such as the upcoming task-based async worker.

GH-4578
Add a new flush_async method to worker ABC. This is necessary because the async transport cannot use a
synchronous blocking flush.

GH-4578
Move the flush_async down to the concrete subclass to not break existing testing. This makes sense,
as this will only really be needed by the async worker anyway and therefore is not shared logic.

GH-4578
Coroutines have a return value, however the current function signature for the worker methods does not
accomodate for this. Therefore, this signature was changed.

GH-4578
cursor[bot]

This comment was marked as outdated.

srothh added 10 commits July 30, 2025 11:41
Add a new implementation of the worker interface, implementing the worker as an async task. This is
to be used by the upcoming async transport.

GH-4581
Refactor the flush method in the async worker to use the async_flush coroutine.

GH-4581
…unctions

Add a check to see wheter callbacks are awaitable coroutines or functions, as coroutines need to be awaited.

GH-4581
…coroutines

Coroutines do not return None, therefore it is necessary to consider this in the callback parameter of the worker. Previously,
only callbacks with return Type None were accepted.

GH-4581
Enable concurrent callbacks on async task worker by firing them as a task rather than awaiting them. A done callback handles the necessary queue and exception logic.

GH-4581
Changed kill to also use the _TERMINATOR sentinel, so the queue is still drained to this point on kill instead of cancelled immediately. This should also fix potential race conditions with flush_async.

GH-4581
Add proper type annotation to worker task list to fix linting problems

GH-4581
Refactor worker implementation to simplify callback processing, fix pending calculation and improve queue initialisation.

GH-4581
@srothh srothh force-pushed the srothh/async-task-worker branch from 97c5e3d to b5eda0e Compare July 30, 2025 12:07
@srothh srothh force-pushed the srothh/async-transport branch from c541bd7 to f5ef707 Compare July 30, 2025 13:12
@srothh srothh force-pushed the srothh/async-transport branch from 295a0e9 to 6cb72ad Compare July 31, 2025 12:11
cursor[bot]

This comment was marked as outdated.

@srothh
Copy link
Member Author

srothh commented Jul 31, 2025

The lint error is in an unrelated file, not sure why it is happening

Copy link
Member

@antonpirker antonpirker left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good now! (the linting just fails because openfeature released something yesterday that breaks our linting. ignore this)

Base automatically changed from srothh/async-task-worker to srothh/transport-class-hierarchy August 12, 2025 15:30
asyncio.run_coroutine_threadsafe(
self._capture_envelope(envelope),
self.loop,
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: AsyncHttpTransport Event Loop Handling Errors

The AsyncHttpTransport.capture_envelope method has two event loop handling issues:

  1. It incorrectly uses asyncio.create_task on the current thread's running loop, which may not be self.loop, causing tasks to run on an unintended event loop.
  2. A race condition exists where self.loop.is_running() can be true, but the loop stops before asyncio.run_coroutine_threadsafe() is called, leading to a RuntimeError.
Fix in Cursor Fix in Web

Copy link
Member

@sl0thentr0py sl0thentr0py left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cosmetic review first, will review logic now

def _create_worker(self: Self, options: Dict[str, Any]) -> Worker:
# For now, we only support the threaded sync background worker.
return BackgroundWorker(queue_size=options["transport_queue_size"])
def _create_worker(self, options: dict[str, Any]) -> Worker:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_create_worker in the shared class should be NotImplemented while the respective sync and async classes should implement only their own version.

transport_cls: Type[Transport] = (
Http2Transport if use_http2_transport else HttpTransport
)
if use_async_transport:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit on writing good nested/multiple conditionals, each branch should only be responsible for its own logic, so a bit cleaner the other way around

    transport_cls = Http2Transport if use_http2_transport else HttpTransport
    if use_async_transport:
        try:
            asyncio.get_running_loop()
            transport_cls: Type[Transport] = AsyncHttpTransport
        except RuntimeError:
            # No event loop running, fall back to sync transport
            logger.warning("No event loop running, falling back to sync transport.")

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactored it :)

def __init__(self: Self, options: Dict[str, Any]) -> None:
super().__init__(options)
logger.warning(
"You tried to use AsyncHttpTransport but don't have httpcore[asyncio] installed. Falling back to sync transport."
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's gather all the required warnings in one place, so move this logger statement and check to make_transport below and then this can just become

AsyncHttpTransport = BaseHttpTransport

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

Refactor transport code based on PR suggestions. Furthermore, add a requirement for the async extra in setup.py

GH-4582
return self.loop.create_task(self._pool.aclose()) # type: ignore
except RuntimeError:
logger.warning("Event loop not running, aborting kill.")
return None
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Async Transport Flush Issues

The AsyncHttpTransport.flush() method submits an unawaited coroutine to the AsyncWorker by calling the async _flush_client_reports method within a synchronous lambda. This prevents client reports from being flushed and may cause runtime warnings.

Furthermore, the AsyncHttpTransport.background_tasks set is initialized but never populated, rendering the task cancellation logic in kill() ineffective.

Lastly, AsyncHttpTransport.flush() changes its return type from None (in BaseHttpTransport) to Optional[asyncio.Task[None]], breaking the interface contract and potentially indicating a type mismatch with _worker.flush.

Fix in Cursor Fix in Web

try:
ASYNC_TRANSPORT_ENABLED = httpcore is not None
except ImportError:
ASYNC_TRANSPORT_ENABLED = False
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Redundant ImportError Handling

The try/except ImportError block for ASYNC_TRANSPORT_ENABLED is redundant. The httpcore module's availability is already determined before this block, so the expression httpcore is not None cannot raise an ImportError. This makes the except clause unreachable dead code.

Fix in Cursor Fix in Web

return self.loop.create_task(self._pool.aclose()) # type: ignore
except RuntimeError:
logger.warning("Event loop not running, aborting kill.")
return None
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: AsyncHttpTransport Kill Method Fails Resource Cleanup

The AsyncHttpTransport's kill() method has two main problems:

  1. Incomplete Resource Cleanup: The background_tasks set, intended for tracking, is never populated, rendering the task cancellation logic ineffective. Additionally, the _pool.aclose() task created by kill() is not tracked, potentially causing resource leaks if not explicitly awaited by the caller.
  2. Interface Violation: The method's return type is changed from None (as defined in the base Transport interface) to Optional[asyncio.Task[None]], violating the Liskov Substitution Principle.
Additional Locations (1)
Fix in Cursor Fix in Web

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will remove the background task set, it should not be necessary anymore I think

logger.debug("Flushing HTTP transport")

if timeout > 0:
self._worker.submit(lambda: self._flush_client_reports(force=True))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Async Method Not Awaited Causes Flush Failure

The AsyncHttpTransport.flush method submits a synchronous lambda to the worker that calls the async _flush_client_reports method without awaiting it. This results in a coroutine object being submitted instead of the function executing, preventing client reports from being flushed. The correct pattern, as seen in _capture_envelope, is to use an async wrapper.

Fix in Cursor Fix in Web

try:
ASYNC_TRANSPORT_ENABLED = httpcore is not None
except ImportError:
ASYNC_TRANSPORT_ENABLED = False
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Redundant Import Handling in Async Transport

The try/except ImportError block around ASYNC_TRANSPORT_ENABLED = httpcore is not None is redundant. No import statement exists within the try block, rendering the except ImportError clause unreachable dead code. The ASYNC_TRANSPORT_ENABLED flag is simply assigned based on the prior import status of httpcore, making the try/except structure unnecessary and misleading.

Fix in Cursor Fix in Web

},
)

async def _flush_client_reports(self: Self, force: bool = False) -> None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

think this does not need to be async def anymore and can be moved to shared?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

merging it in, we will take care of this when we do the mixin

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is async because otherwise the worker process_callback needs the case distinction between coroutines and sync functions.

See:

#4591 (comment)
#4614 (comment)

@sl0thentr0py sl0thentr0py merged commit 35d7078 into srothh/transport-class-hierarchy Aug 13, 2025
121 of 125 checks passed
@sl0thentr0py sl0thentr0py deleted the srothh/async-transport branch August 13, 2025 16:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants